# -*- coding: utf-8 -*-
from datetime import timedelta
from time import time
from utils.operators.rich_sql_sensor import RichSqlSensor
from jms.dm.dm_daily_waybill_weight_difference_sum_dt import jms_dm__dm_dm_daily_waybill_weight_difference_sum_dt


label = f"dws_daily_waybill_weight_difference_sum_{int(time())}"
timeout = timedelta(minutes=40).seconds
doris_jms_dws__dws_daily_waybill_weight_difference_sum = RichSqlSensor(
    task_id='doris_jms_dws__dws_daily_waybill_weight_difference_sum',
    pool='broker_load_pool',
    email=['houwenlong@jtexpress.com','yl_bigdata@yl-scm.com'],
    task_concurrency=1,
    conn_id='doris',
    pre_sql=f"""
                 TRUNCATE TABLE jms_dws.dws_daily_waybill_weight_difference_sum PARTITION (p{{{{ execution_date | date_add(-6) | cst_ds_nodash}}}}
                                                                                             ,p{{{{ execution_date | date_add(-7) | cst_ds_nodash}}}}
                                                                                             ,p{{{{ execution_date | date_add(-8) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-9) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-10) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-11) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-12) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-13) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-14) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-15) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-16) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-17) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-18) | cst_ds_nodash }}}}
                                                                                             ,p{{{{ execution_date | date_add(-19) | cst_ds_nodash }}}});
                 LOAD LABEL jms_dws.{label} (
                     DATA INFILE("hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-19) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-18) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-17) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-16) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-15) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-14) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-13) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-12) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-11) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-10) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-9) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-8) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-7) | cst_ds }}}}/*",
                                 "hdfs://yl-bg-hdp/dw/hive/jms_dm.db/external/dm_daily_waybill_weight_difference_sum_dt/dt={{{{ execution_date | date_add(-6) | cst_ds }}}}/*")
                     INTO TABLE dws_daily_waybill_weight_difference_sum
                     FORMAT AS 'PARQUET'
                 )
                 WITH BROKER '{{{{ var.json.doris_brokers | random_choice }}}}'
                 PROPERTIES ('timeout'='{timeout}', 'max_filter_ratio'='0.0')""",
    poke_sql=f"SHOW LOAD FROM jms_dws WHERE label = '{label}' ORDER BY CreateTime DESC LIMIT 1",
    sql_on_kill=f"CANCEL LOAD FROM jms_dws WHERE LABEL = '{label}'",
    success=lambda r: r[2] == 'FINISHED',
    failure=lambda r: (r[2] is not None and r[2] == 'CANCELLED', str(r[7])),
    poke_interval=60,
    execution_timeout=timedelta(seconds=timeout + 120), )

doris_jms_dws__dws_daily_waybill_weight_difference_sum << jms_dm__dm_dm_daily_waybill_weight_difference_sum_dt
